/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.core.io;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * Simple serialization / deserialization methods for the {@link SimpleVersionedSerializer}.
 */
@PublicEvolving
public class SimpleVersionedSerialization {

	/**
	 * Serializes the version and datum into a stream.
	 *
	 * <p>Data serialized via this method can be deserialized via
	 * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, DataInputView)}.
	 *
	 * <p>The first four bytes will be occupied by the version, as returned by
	 * {@link SimpleVersionedSerializer#getVersion()}. The remaining bytes will be the serialized
	 * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}, plus its length.
	 * The resulting array will hence be eight bytes larger than the serialized datum.
	 *
	 * @param serializer The serializer to serialize the datum with.
	 * @param datum The datum to serialize.
	 * @param out The stream to serialize to.
	 */
	public static <T> void writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out) throws IOException {
		checkNotNull(serializer, "serializer");
		checkNotNull(datum, "datum");
		checkNotNull(out, "out");

		final byte[] data = serializer.serialize(datum);

		out.writeInt(serializer.getVersion());
		out.writeInt(data.length);
		out.write(data);
	}

	/**
	 * Deserializes the version and datum from a stream.
	 *
	 * <p>This method deserializes data serialized via
	 * {@link #writeVersionAndSerialize(SimpleVersionedSerializer, Object, DataOutputView)}.
	 *
	 * <p>The first four bytes will be interpreted as the version. The next four bytes will be
	 * interpreted as the length of the datum bytes, then length-many bytes will be read.
	 * Finally, the datum is deserialized via the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
	 * method.
	 *
	 * @param serializer The serializer to serialize the datum with.
	 * @param in The stream to deserialize from.
	 */
	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, DataInputView in) throws IOException {
		checkNotNull(serializer, "serializer");
		checkNotNull(in, "in");

		final int version = in.readInt();
		final int length = in.readInt();
		final byte[] data = new byte[length];
		in.readFully(data);

		return serializer.deserialize(version, data);
	}

	/**
	 * Serializes the version and datum into a byte array. The first four bytes will be occupied by
	 * the version (as returned by {@link SimpleVersionedSerializer#getVersion()}),
	 * written in <i>big-endian</i> encoding. The remaining bytes will be the serialized
	 * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}. The resulting array
	 * will hence be four bytes larger than the serialized datum.
	 *
	 * <p>Data serialized via this method can be deserialized via
	 * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, byte[])}.
	 *
	 * @param serializer The serializer to serialize the datum with.
	 * @param datum The datum to serialize.
	 *
	 * @return A byte array containing the serialized version and serialized datum.
	 *
	 * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#serialize(Object)}
	 *                     method are forwarded.
	 */
	public static <T> byte[] writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum) throws IOException {
		checkNotNull(serializer, "serializer");
		checkNotNull(datum, "datum");

		final byte[] data = serializer.serialize(datum);
		final byte[] versionAndData = new byte[data.length + 8];

		final int version = serializer.getVersion();
		versionAndData[0] = (byte) (version >> 24);
		versionAndData[1] = (byte) (version >> 16);
		versionAndData[2] = (byte) (version >>  8);
		versionAndData[3] = (byte)  version;

		final int length = data.length;
		versionAndData[4] = (byte) (length >> 24);
		versionAndData[5] = (byte) (length >> 16);
		versionAndData[6] = (byte) (length >>  8);
		versionAndData[7] = (byte)  length;

		// move the data to the array
		System.arraycopy(data, 0, versionAndData, 8, data.length);

		return versionAndData;
	}

	/**
	 * Deserializes the version and datum from a byte array. The first four bytes will be read as
	 * the version, in <i>big-endian</i> encoding. The remaining bytes will be passed to the serializer
	 * for deserialization, via {@link SimpleVersionedSerializer#deserialize(int, byte[])}.
	 *
	 * @param serializer The serializer to deserialize the datum with.
	 * @param bytes The bytes to deserialize from.
	 *
	 * @return The deserialized datum.
	 *
	 * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
	 *                     method are forwarded.
	 */
	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
		checkNotNull(serializer, "serializer");
		checkNotNull(bytes, "bytes");
		checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");

		final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
		final int version =
				((bytes[0] & 0xff) << 24) |
				((bytes[1] & 0xff) << 16) |
				((bytes[2] & 0xff) <<  8) |
				(bytes[3] & 0xff);

		final int length =
				((bytes[4] & 0xff) << 24) |
				((bytes[5] & 0xff) << 16) |
				((bytes[6] & 0xff) <<  8) |
				(bytes[7] & 0xff);

		if (length == dataOnly.length) {
			return serializer.deserialize(version, dataOnly);
		}
		else {
			throw new IOException("Corrupt data, conflicting lengths. Length fields: " + length + ", data: " + dataOnly.length);
		}
	}

	// ------------------------------------------------------------------------

	/** Utility class, not meant to be instantiated. */
	private SimpleVersionedSerialization() {}
}
